package com.atguigu.gmall.realtime.utils

import java.util

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeConfig
import org.apache.http.HttpHost
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.search.{SearchRequest, SearchResponse}
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.search.SearchHit
import org.elasticsearch.search.builder.SearchSourceBuilder

import scala.collection.mutable.ListBuffer

/**
 * ES工具类， 用于从ES中查询数据 及 往ES中写入数据
 */
object MyEsUtils {
  def main(args: Array[String]): Unit = {
    val list: List[String] = searchField("gmall_dau_info_2022-07-07" ,"mid")
    println(list.size)
    println(list)
  }



  private val esClient : RestHighLevelClient = create()

  /**
   * 创建ES客户端
   * @return
   */
  def create(): RestHighLevelClient = {
    val httpHost: HttpHost = new HttpHost(MyPropsUtil(MyConfig.ES_HOST) , MyPropsUtil(MyConfig.ES_PORT).toInt)
    val restClientBuilder: RestClientBuilder = RestClient.builder(httpHost)
    val client: RestHighLevelClient = new RestHighLevelClient(restClientBuilder)
    client
  }


  /**
   *
   * @param indexName 查询的索引
   * @param fieldName 查询的字段
   * @return
   */
  def  searchField(indexName : String , fieldName : String ): List[String] = {
    //判断索引是否存在
    val getIndexRequest: GetIndexRequest = new GetIndexRequest(indexName)
    val exists: Boolean = esClient.indices().exists(getIndexRequest , RequestOptions.DEFAULT)
    if(!exists){
      return null
    }
    val fieldValues: ListBuffer[String] = ListBuffer[String]()
    val searchRequest: SearchRequest = new SearchRequest(indexName)
    val searchSourceBuilder: SearchSourceBuilder = new SearchSourceBuilder()
    //_source
    searchSourceBuilder.fetchSource(fieldName,null)
    //_size 设置展示的信息条数
    searchSourceBuilder.size(100000)
    searchRequest.source(searchSourceBuilder)
    val searchResponse: SearchResponse = esClient.search(searchRequest , RequestOptions.DEFAULT)
    val hits: Array[SearchHit] = searchResponse.getHits.getHits
    for (searchHit <- hits) {
      val sourceAsMap: util.Map[String, AnyRef] = searchHit.getSourceAsMap
      val fieldValue: String = sourceAsMap.get(fieldName).toString
      fieldValues.append(fieldValue)
    }

    fieldValues.toList
  }

  /**
   *
   * @param indexName  写入的索引
   * @param docs 写入的数据
   */
  def bulkSave(indexName : String , docs :  List[(String,AnyRef)]): Unit ={

    val bulkRequest: BulkRequest = new BulkRequest(indexName)
    for ((docId, data) <- docs) {
      val indexRequest: IndexRequest = new IndexRequest()
      indexRequest.id(docId)
      indexRequest.source(JSON.toJSONString(data, new SerializeConfig(true)) , XContentType.JSON)
      bulkRequest.add(indexRequest)
    }

    esClient.bulk(bulkRequest , RequestOptions.DEFAULT)
  }

}
